package com.bw.yk01;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class DwsClickhouseSink extends RichSinkFunction<YK8Result> {

    Connection conn = null;
    PreparedStatement ps = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        conn = DriverManager.getConnection("jdbc:clickhouse://hadoop-single:8123", "default", "");
        ps = conn.prepareStatement("insert into yk8 values(toDateTime(?),toDateTime(?),?,?,?,?,?,?,?,?,?)");
        conn.setAutoCommit(true);
    }

    @Override
    public void close() throws Exception {
        ps.close();
        conn.close();
    }

    @Override
    public void invoke(YK8Result value, Context context) throws Exception {
        ps.setString(1,value.getStt());
        ps.setString(2,value.getEdt());
        ps.setString(3,value.getSku_id());
        ps.setString(4,value.getSku_name());
        ps.setString(5,value.getProvince_id());
        ps.setString(6,value.getProvince_name());
        ps.setString(7,value.getUser_id());
        ps.setLong(8,value.getOrder_sku_num());
        ps.setDouble(9,value.getOrder_amount());
        ps.setLong(10,value.getOrder_ct());
        ps.setLong(11,value.getTs());
        ps.execute();
    }
}
